package com.study.mr.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.File;
import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Orderpd> {
    private Text outk=new Text();
    private Orderpd outv=new Orderpd();
    private FileSplit inputSplit;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        inputSplit = (FileSplit) context.getInputSplit();
    }

    /**
     * 核心业务逻辑。将两个需要作关联的文件进行搜集
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取当前行数据
        String line = value.toString();
        //切割当前行数据 pd:01 小米  order:1001	01	1
        String[] datas = line.split("\t");
        //将当前数据封装到Orderpd
        if (inputSplit.getPath().getName().contains("order")) {
            //当前行数据来源于order.txt文件 1001	01	1
            //封装输出数据的key
            outk.set(datas[1]);
            //封装输出数据的value
            outv.setOrderId(datas[0]);
            outv.setPid(datas[1]);
            outv.setAmount(Integer.parseInt(datas[2]));
            outv.setPname("");
            outv.setTitle("order");

        }else {
            //数据来源于pd.txt文件:01 小米
            outk.set(datas[0]);
            //封装输出数据的value
            outv.setOrderId("");
            outv.setPid(datas[0]);
            outv.setAmount(0);
            outv.setPname(datas[1]);
            outv.setTitle("pd");
        }
        //将数据写出
        context.write(outk,outv);

    }
}
